package com.atguigu.flink.datastreamapi.divide;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Created by Smexy on 2023/11/13
 */
public class Demo1_FilterDivide
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //将s1的传感器数据单独划分为一个流，黑色打印
        ds
            .filter(ws -> "s1".equals(ws.getId()))
            .print();

        //其他的传感器作为一个流，红色打印
        ds
            .filter(ws -> !"s1".equals(ws.getId()))
            .printToErr();


        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
